diff --git a/README.md b/README.md index de21e5c..cb08f09 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ ![coverage](https://img.shields.io/badge/Coverage-96-darkred) [![dependencies](https://img.shields.io/badge/Dependencies-1-blueviolet)](https://www.npmjs.com/package/hyparquet-writer?activeTab=dependencies) +Hyparquet Writer is a JavaScript library for writing [Apache Parquet](https://parquet.apache.org) files. It is designed to be lightweight, fast and store data very efficiently. It is a companion to the [hyparquet](https://github.com/hyparam/hyparquet) library, which is a JavaScript library for reading parquet files. + ## Usage Call `parquetWrite` with a list of columns, each column is an object with a `name` and `data` field. The `data` field should be an array of same-type values. diff --git a/package.json b/package.json index 9203afb..1d8744c 100644 --- a/package.json +++ b/package.json @@ -40,11 +40,11 @@ "test": "vitest run" }, "dependencies": { - "hyparquet": "1.10.2" + "hyparquet": "1.10.3" }, "devDependencies": { "@babel/eslint-parser": "7.27.0", - "@types/node": "22.13.17", + "@types/node": "22.14.0", "@vitest/coverage-v8": "3.1.1", "eslint": "9.23.0", "eslint-plugin-jsdoc": "50.6.9", diff --git a/src/write.js b/src/write.js index 9758998..ed12ac2 100644 --- a/src/write.js +++ b/src/write.js @@ -6,74 +6,79 @@ import { getSchemaElementForValues } from './schema.js' /** * Write data as parquet to an ArrayBuffer * - * @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement, SchemaTree} from 'hyparquet' + * @import {ColumnChunk, DecodedArray, FileMetaData, RowGroup, SchemaElement, SchemaTree} from 'hyparquet' * @import {KeyValue} from 'hyparquet/src/types.js' * @import {ColumnData} from '../src/types.js' * @param {object} options * @param {ColumnData[]} options.columnData * @param {boolean} [options.compressed] + * @param {number} [options.rowGroupSize] * @param {KeyValue[]} [options.kvMetadata] * @returns {ArrayBuffer} */ -export function parquetWrite({ columnData, compressed = true, kvMetadata }) { +export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100000, kvMetadata }) { + const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n const writer = new Writer() - // Check if all columns have the same length - const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n - for (const { data } of columnData) { - if (BigInt(data.length) !== num_rows) { - throw new Error('columns must have the same length') - } - } - - // Write header PAR1 - writer.appendUint32(0x31524150) - - // schema + // construct schema /** @type {SchemaElement[]} */ const schema = [{ name: 'root', num_children: columnData.length, }] - - // row group columns - /** @type {ColumnChunk[]} */ - const columns = [] - - // Write columns for (const { name, data, type } of columnData) { + // check if all columns have the same length + if (BigInt(data.length) !== num_rows) { + throw new Error('columns must have the same length') + } // auto-detect type const schemaElement = getSchemaElementForValues(name, data, type) if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`) - const file_offset = BigInt(writer.offset) - /** @type {SchemaElement[]} */ - const schemaPath = [ - schema[0], - schemaElement, - ] - const meta_data = writeColumn(writer, schemaPath, data, compressed) - - // save metadata schema.push(schemaElement) - columns.push({ - file_path: name, - file_offset, - meta_data, + } + + // write header PAR1 + writer.appendUint32(0x31524150) + + /** @type {RowGroup[]} */ + const row_groups = [] + for (let i = 0; i < num_rows; i += rowGroupSize) { + const groupStart = writer.offset + + // row group columns + /** @type {ColumnChunk[]} */ + const columns = [] + + // write columns + for (let i = 0; i < columnData.length; i++) { + const { name, data } = columnData[i] + const file_offset = BigInt(writer.offset) + const schemaPath = [schema[0], schema[i + 1]] + const meta_data = writeColumn(writer, schemaPath, data, compressed) + + // save metadata + columns.push({ + file_path: name, + file_offset, + meta_data, + }) + } + + row_groups.push({ + columns, + total_byte_size: BigInt(writer.offset - groupStart), + num_rows: BigInt(Math.min(rowGroupSize, Number(num_rows) - i)), }) } - // Write metadata + // write metadata /** @type {FileMetaData} */ const metadata = { version: 2, created_by: 'hyparquet', schema, num_rows, - row_groups: [{ - columns, - total_byte_size: BigInt(writer.offset - 4), - num_rows, - }], + row_groups, metadata_length: 0, key_value_metadata: kvMetadata, } @@ -81,7 +86,7 @@ export function parquetWrite({ columnData, compressed = true, kvMetadata }) { delete metadata.metadata_length writeMetadata(writer, metadata) - // Write footer PAR1 + // write footer PAR1 writer.appendUint32(0x31524150) return writer.getBuffer() diff --git a/test/write.test.js b/test/write.test.js index e4ee465..2bd5d3f 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -172,6 +172,22 @@ describe('parquetWrite', () => { expect(result[5].double).not.toEqual(0) }) + it('splits row groups', async () => { + const data = Array(200).fill(13) + const file = parquetWrite({ columnData: [{ name: 'int', data }], rowGroupSize: 100 }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups.length).toBe(2) + expect(metadata.row_groups[0].num_rows).toBe(100n) + expect(metadata.row_groups[1].num_rows).toBe(100n) + // round trip + const result = await parquetReadObjects({ file }) + expect(result.length).toBe(200) + expect(result[0]).toEqual({ int: 13 }) + expect(result[99]).toEqual({ int: 13 }) + expect(result[100]).toEqual({ int: 13 }) + expect(result[199]).toEqual({ int: 13 }) + }) + it('throws for wrong type specified', () => { expect(() => parquetWrite({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] })) .toThrow('parquet cannot write mixed types')